嗨嗨!大家好!歡迎來到 Rust 三十天挑戰的第十六天!
昨天我們學習了閉包和迭代器這些函數式程式設計的強大工具,今天我們要踏入另一個讓 Rust 引以為傲的領域:併發程式設計!
老實說,多執行緒一直是我在寫程式時既愛又怕的領域。愛的是它能讓程式跑得更快,怕的是那些詭異的競態條件(race condition)和難以除錯的死鎖問題。在 C# 中,雖然有 lock
、Task
這些工具,但還是經常會遇到一些讓人頭痛的併發問題。
但 Rust 不一樣!它透過所有權系統和型別系統,在編譯時期就能防止大部分的併發問題,實現了「無懼併發」(Fearless Concurrency)。這聽起來很神奇對吧?讓我們一起來看看 Rust 是如何做到的!
在現代多核心處理器當道的時代,如果我們的程式只能使用一個 CPU 核心,那就太浪費了。併發程式設計讓我們可以:
在深入程式碼之前,讓我們先理解 Rust 對併發的設計哲學:
thread::spawn
讓我們從最基本的執行緒建立開始:
use std::thread;
use std::time::Duration;
fn main() {
println!("主執行緒開始");
// 建立一個新的執行緒
thread::spawn(|| {
for i in 1..=5 {
println!("子執行緒:計數 {}", i);
thread::sleep(Duration::from_millis(500));
}
println!("子執行緒結束");
});
// 主執行緒的工作
for i in 1..=3 {
println!("主執行緒:計數 {}", i);
thread::sleep(Duration::from_millis(300));
}
println!("主執行緒結束");
}
咦?如果你執行這個程式,你可能會發現子執行緒還沒執行完,程式就結束了!這是因為當主執行緒結束時,整個程式就會終止,不管子執行緒是否還在執行。
JoinHandle
要解決這個問題,我們需要使用 join()
方法:
use std::thread;
use std::time::Duration;
fn main() {
println!("主執行緒開始");
// spawn 會回傳一個 JoinHandle
let handle = thread::spawn(|| {
for i in 1..=5 {
println!("子執行緒:計數 {}", i);
thread::sleep(Duration::from_millis(500));
}
println!("子執行緒結束");
});
// 主執行緒的工作
for i in 1..=3 {
println!("主執行緒:計數 {}", i);
thread::sleep(Duration::from_millis(300));
}
// 等待子執行緒完成
handle.join().unwrap();
println!("主執行緒結束");
}
use std::thread;
fn main() {
let data = "Hello from main thread!".to_string();
// 使用 move 關鍵字將 data 的所有權移動到執行緒中
let handle = thread::spawn(move || {
println!("子執行緒收到:{}", data);
data.len() // 回傳資料長度
});
// data 已經被移動,這裡無法再使用
// println!("{}", data); // 這會編譯錯誤
// 從執行緒取得回傳值
let result = handle.join().unwrap();
println!("字串長度:{}", result);
}
Rust 提供了強大的通道機制來讓執行緒間安全地傳遞訊息:
use std::sync::mpsc; // multiple producer, single consumer
use std::thread;
use std::time::Duration;
fn main() {
// 建立通道,取得發送者 (tx) 和接收者 (rx)
let (tx, rx) = mpsc::channel();
// 發送者執行緒
thread::spawn(move || {
let messages = vec![
"第一個訊息",
"第二個訊息",
"第三個訊息",
];
for message in messages {
tx.send(message).unwrap();
thread::sleep(Duration::from_secs(1));
}
// 發送完畢,關閉通道
println!("發送者:所有訊息已發送");
});
// 接收者(主執行緒)
for received in rx {
println!("接收到:{}", received);
}
println!("所有訊息接收完畢");
}
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
// 複製發送者來建立多個發送者
for id in 1..=3 {
let sender = tx.clone();
thread::spawn(move || {
for i in 1..=3 {
let message = format!("執行緒 {} 的訊息 {}", id, i);
sender.send(message).unwrap();
}
});
}
// 關閉原始發送者
drop(tx);
// 接收所有訊息
for received in rx {
println!("收到:{}", received);
}
}
有時候我們需要多個執行緒共享同一份資料,這時就需要使用互斥鎖(Mutex):
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
// Arc:原子參考計數,允許多個執行緒擁有同一份資料
// Mutex:互斥鎖,確保同時只有一個執行緒能修改資料
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for i in 0..10 {
let counter_clone = Arc::clone(&counter);
let handle = thread::spawn(move || {
for _ in 0..1000 {
// 取得鎖並修改資料
let mut num = counter_clone.lock().unwrap();
*num += 1;
}
println!("執行緒 {} 完成", i);
});
handles.push(handle);
}
// 等待所有執行緒完成
for handle in handles {
handle.join().unwrap();
}
println!("最終計數:{}", *counter.lock().unwrap());
}
讓我們結合昨天學到的迭代器和今天的併發知識,來處理一個實際問題:
use std::sync::{Arc, Mutex};
use std::thread;
// 模擬一個計算密集的函式
fn expensive_calculation(n: u64) -> u64 {
// 計算費波那契數列
if n <= 1 {
n
} else {
expensive_calculation(n - 1) + expensive_calculation(n - 2)
}
}
fn main() {
let numbers = vec![30, 31, 32, 33, 34];
let results = Arc::new(Mutex::new(Vec::new()));
let mut handles = vec![];
println!("開始並行計算...");
for number in numbers {
let results_clone = Arc::clone(&results);
let handle = thread::spawn(move || {
let result = expensive_calculation(number);
println!("fibonacci({}) = {}", number, result);
// 將結果加入共享的向量
let mut results_guard = results_clone.lock().unwrap();
results_guard.push((number, result));
});
handles.push(handle);
}
// 等待所有計算完成
for handle in handles {
handle.join().unwrap();
}
// 顯示所有結果
let final_results = results.lock().unwrap();
println!("\n所有結果:");
for (n, result) in final_results.iter() {
println!("fibonacci({}) = {}", n, result);
}
}
讓我們實作一個簡單的工作竊取模式,這是現代併發系統常用的技術:
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
struct TaskQueue {
tasks: Mutex<Vec<Box<dyn Fn() + Send>>>,
}
impl TaskQueue {
fn new() -> Self {
TaskQueue {
tasks: Mutex::new(Vec::new()),
}
}
fn add_task<F>(&self, task: F)
where
F: Fn() + Send + 'static,
{
let mut tasks = self.tasks.lock().unwrap();
tasks.push(Box::new(task));
}
fn execute_next_task(&self) -> bool {
let mut tasks = self.tasks.lock().unwrap();
if let Some(task) = tasks.pop() {
drop(tasks); // 釋放鎖
task();
true
} else {
false
}
}
}
fn main() {
let task_queue = Arc::new(TaskQueue::new());
// 添加一些任務
for i in 1..=10 {
let task = move || {
println!("執行任務 {}", i);
thread::sleep(Duration::from_millis(100));
};
task_queue.add_task(task);
}
// 建立工作執行緒
let mut handles = vec![];
for worker_id in 1..=3 {
let queue_clone = Arc::clone(&task_queue);
let handle = thread::spawn(move || {
loop {
if !queue_clone.execute_next_task() {
println!("工作者 {} 沒有更多任務", worker_id);
break;
}
}
});
handles.push(handle);
}
// 等待所有工作完成
for handle in handles {
handle.join().unwrap();
}
println!("所有任務執行完畢");
}
// 推薦:使用通道
let (tx, rx) = mpsc::channel();
// 較少使用:共享狀態
let shared_data = Arc::new(Mutex::new(data));
// 不好的例子
let data = mutex.lock().unwrap();
expensive_operation(); // 長時間持有鎖
*data += 1;
// 好的例子
let value = {
let data = mutex.lock().unwrap();
*data
}; // 鎖在這裡釋放
expensive_operation();
{
let mut data = mutex.lock().unwrap();
*data = value + 1;
} // 鎖快速釋放
// 可能導致死鎖
let _lock1 = mutex1.lock().unwrap();
let _lock2 = mutex2.lock().unwrap(); // 危險!
// 更安全的方式:確保鎖的順序一致
// 或使用更高階的併發原語
use std::thread;
fn main() {
let data = vec![1, 2, 3];
thread::spawn(|| {
// println!("{:?}", data); // 編譯錯誤!沒有 move
});
// Rust 會強制你明確處理所有權:
thread::spawn(move || {
println!("{:?}", data); // 正確!
});
}
use std::sync::Mutex;
fn main() {
let mutex = Mutex::new(5);
{
let data1 = mutex.lock().unwrap();
// let data2 = mutex.lock().unwrap(); // 這會阻塞,不是編譯錯誤
// 但 Rust 的型別系統防止了更危險的情況
}
}
// Send:型別可以在執行緒間轉移所有權
// Sync:型別可以被多個執行緒同時參考
use std::rc::Rc;
use std::thread;
fn main() {
let rc = Rc::new(5);
// 這會編譯錯誤,因為 Rc<T> 沒有實作 Send
// thread::spawn(move || {
// println!("{}", rc);
// });
// 應該使用 Arc<T>
use std::sync::Arc;
let arc = Arc::new(5);
thread::spawn(move || {
println!("{}", arc);
});
}
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Instant;
fn cpu_intensive_task(n: u32) -> u32 {
(0..n).map(|i| i * i).sum()
}
fn main() {
let data_size = 1_000_000;
let thread_count = 4;
let chunk_size = data_size / thread_count;
// 單執行緒版本
let start = Instant::now();
let single_result = cpu_intensive_task(data_size);
let single_duration = start.elapsed();
// 多執行緒版本
let start = Instant::now();
let results = Arc::new(Mutex::new(Vec::new()));
let mut handles = vec![];
for i in 0..thread_count {
let start_idx = i * chunk_size;
let end_idx = if i == thread_count - 1 {
data_size
} else {
(i + 1) * chunk_size
};
let results_clone = Arc::clone(&results);
let handle = thread::spawn(move || {
let partial_result = cpu_intensive_task(end_idx - start_idx);
let mut results_guard = results_clone.lock().unwrap();
results_guard.push(partial_result);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
let multi_result: u32 = results.lock().unwrap().iter().sum();
let multi_duration = start.elapsed();
println!("單執行緒結果:{}", single_result);
println!("多執行緒結果:{}", multi_result);
println!("單執行緒耗時:{:?}", single_duration);
println!("多執行緒耗時:{:?}", multi_duration);
if single_duration > multi_duration {
let speedup = single_duration.as_secs_f64() / multi_duration.as_secs_f64();
println!("加速比:{:.2}x", speedup);
}
}
今天我們深入探討了 Rust 併發程式設計的核心概念:
執行緒管理:
thread::spawn
:建立新執行緒JoinHandle
:等待執行緒完成並取得結果move
閉包:將資料所有權轉移到執行緒通訊機制:
共享狀態:
設計原則:
為什麼重要?
Rust 的併發模型真的讓人印象深刻。它讓我們能夠享受併發程式設計帶來的效能提升,同時又不用擔心那些傳統併發程式設計中的陷阱。
為了鞏固今天的學習,嘗試實作一個並行網頁爬蟲:
功能需求:
技術要求:
技術提示:
use std::sync::{Arc, Mutex, mpsc};
use std::thread;
use std::time::Duration;
struct CrawlResult {
url: String,
title: Option<String>,
link_count: usize,
success: bool,
}
struct WebCrawler {
max_workers: usize,
request_delay: Duration,
}
impl WebCrawler {
fn new(max_workers: usize) -> Self {
WebCrawler {
max_workers,
request_delay: Duration::from_millis(100),
}
}
fn crawl_urls(&self, urls: Vec<String>) -> Vec<CrawlResult> {
// 實作並行爬蟲邏輯
// 1. 建立工作執行緒池
// 2. 使用通道分發 URL 任務
// 3. 收集爬取結果
// 4. 回傳完整結果
vec![]
}
fn crawl_page(&self, url: String) -> CrawlResult {
// 模擬網頁爬取(實際專案中會使用 HTTP 客戶端)
// 這裡可以使用 thread::sleep 模擬網路延遲
thread::sleep(self.request_delay);
CrawlResult {
url,
title: Some("模擬標題".to_string()),
link_count: 10,
success: true,
}
}
}
fn main() {
let urls = vec![
"https://example.com".to_string(),
"https://rust-lang.org".to_string(),
"https://github.com".to_string(),
// 添加更多 URL...
];
let crawler = WebCrawler::new(3);
let results = crawler.crawl_urls(urls);
// 顯示統計
let successful = results.iter().filter(|r| r.success).count();
println!("爬取完成:{}/{} 成功", successful, results.len());
}
這個挑戰將讓你綜合運用執行緒、通道和共享狀態來解決實際問題。重點是要設計一個既高效又安全的併發架構。
明天我們將學習 非同步程式設計 (Async/Await),探討 Rust 如何處理 I/O 密集型任務。結合今天學到的併發知識,我們將能夠建立真正高效能的網路服務!
如果在實作過程中遇到任何問題,歡迎在留言區討論。併發程式設計確實需要一些時間來掌握,但 Rust 的安全保證讓這個過程變得更加愉快!
我們明天見!